package com.burt.zookeeper.curator.queue.sync;

import com.burt.zookeeper.curator.queue.CommonHelper;
import com.burt.zookeeper.curator.queue.CustomDistributedQueue;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_ADDED;
import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_REMOVED;

/**
 * Description: 分布式队列，同步队列的实现
 * User: Burt
 * Date: 2017-08-21
 * Time: 13:55
 */
public class DistributedSyncQueue<T> implements CustomDistributedQueue<T> {

    private CuratorFramework curatorFramework;

    //根节点
    private final String root;

    private String NODE_NAME = "queue_sync";

    //队列大小达到多少后执行任务
    private int maxQueueSize;

    private String STARTPATH_ROOT = "/queue_sync";

    private String STARTPATH = STARTPATH_ROOT.concat("/start");

    public DistributedSyncQueue(CuratorFramework curatorFramework, String root, int maxQueueSize) {
        this.curatorFramework = curatorFramework;
        this.root = root;
        this.maxQueueSize = maxQueueSize;
        init();
    }

    /**
     * 初始化根目录
     */
    private void init(){
        try {
            //判断根目录是否存在
            Stat stat = curatorFramework.checkExists().forPath(root);
            if (null == stat){
                curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(root);
            }
            //删除队列满的标志
            if (null != curatorFramework.checkExists().forPath(STARTPATH)) {
                curatorFramework.delete().forPath(STARTPATH);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 向队列提供数据，队列满的话会阻塞等待直到start标志位清除
     * @param element
     * @return
     */
    @Override
    public boolean offer(T element){

        //构建数据节点的完整路径
        String nodeFullPath = root.concat("/").concat(NODE_NAME);
        try {
            if (maxQueueSize > getSize()){
                //创建立持久的节点，写入数据
//                curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(nodeFullPath,Object2Byte(element));
                curatorFramework.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(nodeFullPath, CommonHelper.Object2Byte(element));
                //再判断一次队列是否满
                if (maxQueueSize > getSize()){
                    //确保不存在
                    if (null != curatorFramework.checkExists().forPath(STARTPATH)) {
                        curatorFramework.delete().forPath(STARTPATH);
                    }
                } else {
                    if (null == curatorFramework.checkExists().forPath(STARTPATH)) {
                        curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(STARTPATH);
                    }
                }
            } else {
                //创建队列满的标记
                if (null == curatorFramework.checkExists().forPath(STARTPATH)){
                    curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(STARTPATH);
                }

                //curator监听事件得监听start的父节点才会有效
                CountDownLatch countDownLatch = new CountDownLatch(1);
                PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework,STARTPATH_ROOT,true);
                pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                pathChildrenCache.getListenable().addListener((cf, pathChildrenCacheEvent) -> {
                    if (pathChildrenCacheEvent.getType() == CHILD_REMOVED){
                        System.out.println(STARTPATH + " 节点被删除");
                        countDownLatch.countDown();
                    }

                });

                //如果节点不存在会出现异常
                curatorFramework.checkExists().forPath(STARTPATH);
                countDownLatch.await();
                offer(element);

            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        return  true;
    }

    /**
     * 从队列中获取数据
     * @return
     */
    @Override
    public T poll(){

        try {
            //队列还没满
            if (null == curatorFramework.checkExists().forPath(STARTPATH)){
                //curator监听事件得监听start的父节点才会有效
                CountDownLatch countDownLatch = new CountDownLatch(1);
                PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework,STARTPATH_ROOT,true);
                pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                pathChildrenCache.getListenable().addListener((cf, pathChildrenCacheEvent) -> {
                    if (pathChildrenCacheEvent.getType() == CHILD_ADDED){
                        System.out.println(STARTPATH + " 节点被创建");
                        countDownLatch.countDown();
                    }
                });
                //如果节点不存在会出现异常
                curatorFramework.checkExists().forPath(STARTPATH);
                countDownLatch.await();
            }

            List<String> childrenNodes = curatorFramework.getChildren().forPath(root);
            //从小到大排序
            SortedSet<String> childrenSet = new TreeSet<String>();
            for (String node : childrenNodes) {
                //因为create成功返回的是全路径
                if (STARTPATH.equals(node)){
                    continue;
                }
                childrenSet.add(root.concat("/").concat(node));
            }

            for (String nodeName:childrenSet) {
                T node = (T) CommonHelper.Byte2Object(curatorFramework.getData().forPath(nodeName));
                curatorFramework.delete().forPath(nodeName);
                //删除start节点
                if (null != curatorFramework.checkExists().forPath(STARTPATH)) {
                    curatorFramework.delete().forPath(STARTPATH);
                }
                return node;
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }

    /**
     * 获取队列的大小
     * @return
     * @throws Exception
     */
    public int getSize() throws Exception {
        return curatorFramework.getChildren().forPath(root).size();
    }

    /**
     * 判断队列是否为空
     * @return
     * @throws Exception
     */
    public boolean isEmpty() throws Exception {
        return curatorFramework.getChildren().forPath(root).isEmpty();
    }

}
